-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propagate exceptions throughout transactions #367
Propagate exceptions throughout transactions #367
Conversation
return new Single<>(requestID, queue, this); | ||
} | ||
|
||
public Stream<ResPart> stream(Req.Builder request) { | ||
UUID requestID = UUID.randomUUID(); | ||
ResponseCollector.Queue<ResPart> collector = resPartCollector.queue(requestID); | ||
dispatcher.dispatch(request.setReqId(UUIDAsByteString(requestID)).build()); | ||
ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, dispatcher); | ||
ResponsePartIterator iterator = new ResponsePartIterator(requestID, collector, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now add the stream to the iterator/Single so they can call back when they are finished in order to clean up
|
||
public List<StatusRuntimeException> drainErrors() { | ||
List<StatusRuntimeException> errors = new ArrayList<>(resCollector.drainErrors()); | ||
errors.addAll(resPartCollector.drainErrors()); | ||
void singleDone(UUID requestID) { | ||
resCollector.remove(requestID); | ||
} | ||
|
||
void iteratorDone(UUID requestID) { | ||
resPartCollector.remove(requestID); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate methods for separate collectors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a fairly general comment, not limited to this change:
How aligned are the implementations of tx exception propagation in the 3 clients?
I'm pretty sure client-python had just one done
method here, and client-nodejs doesn't appear to have any.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah python has only one collector for all single and iterator, so it doesn't need split methods. I'm going to go back to update client-nodejs so it's in one of these two styles next (but won't release it)
RequestTransmitter.Dispatcher dispatcher() { | ||
return dispatcher; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we've replaced the ResPartIterator
argument which is dispatcher
with a getter for dispatcher()
on the stream, which the Iterator now has access to
public void close(@Nullable StatusRuntimeException error) { | ||
responseQueue.add(Either.second(new Done(error))); | ||
this.error = error; | ||
responseQueue.add(Either.second(new Done())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hold on to error to be returned later like in node
What is the goal of this PR?
We align with Client NodeJS version 2.6.1 (mostly the work in typedb/typedb-driver-nodejs#197), which implements a better error propagation mechanism: when an exception occurs, we store it against all the transaction's active transmit queues to retrieve whenever the user tries to perform an operation in the transaction anywhere, ensuring the user always sees the originating error.
What are the changes implemented in this PR?
transaction is closed with errors
, which throws the errors from all queues (note that this can be duplicate if there are multiple open transmit queues that have been given the same error)ResponseCollector
, calling internal collectorsresponse_queues
instead ofcollectors
(a collector made of collectors is a bit strange)